package com.pl.process;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;

import com.pl.configuration.KafkaTopicProperties;
import com.pl.event.AbstractMQEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.expression.Expression;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;


public class KafkaMessageProcessor {
    private String topicName;
    private final boolean enableConsumer;
    private final boolean enableProducer;
    private final Expression partitionKeyExpression;
    private final KafkaTopicProperties config;
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final KafkaMessageListenerContainer<String, String> consumer;
    private final MQServiceMatcher serviceMatcher;
    private final ProducerTopicSelector selector;
    private static final Logger log = LoggerFactory.getLogger(KafkaMessageProcessor.class);
    private final MQProcessor processor;
    private final ObjectMapper mapper;

    public String getTopicName() {
        return this.topicName;
    }

    public boolean isEnableConsumer() {
        return this.enableConsumer;
    }

    public boolean isEnableProducer() {
        return this.enableProducer;
    }

    public Expression getPartitionKeyExpression() {
        return this.partitionKeyExpression;
    }

    public KafkaTopicProperties getConfig() {
        return this.config;
    }

    public KafkaTemplate<String, String> getKafkaTemplate() {
        return this.kafkaTemplate;
    }

    public KafkaMessageListenerContainer<String, String> getConsumer() {
        return this.consumer;
    }

    public MQServiceMatcher getServiceMatcher() {
        return this.serviceMatcher;
    }

    public ProducerTopicSelector getSelector() {
        return this.selector;
    }

    public MQProcessor getProcessor() {
        return this.processor;
    }

    public ObjectMapper getMapper() {
        return this.mapper;
    }


    private final Map<String, Boolean> topicCheckMap = new ConcurrentHashMap<>();

    public Map<String, Boolean> getTopicCheckMap() {
        return this.topicCheckMap;
    }


    public KafkaMessageProcessor(MQProcessor processor, KafkaTemplate<String, String> kafkaTemplate, MQServiceMatcher serviceMatcher, KafkaTopicProperties topicConfig) {
        this(processor, kafkaTemplate, serviceMatcher, topicConfig, null);
    }


    public KafkaMessageProcessor(MQProcessor processor, KafkaTemplate<String, String> kafkaTemplate, MQServiceMatcher serviceMatcher, KafkaTopicProperties topicConfig, ProducerTopicSelector selector) {
        this(processor, kafkaTemplate, serviceMatcher, topicConfig, null, selector);
    }


    public KafkaMessageProcessor(MQProcessor processor, KafkaTemplate<String, String> kafkaTemplate, MQServiceMatcher serviceMatcher, KafkaTopicProperties topicConfig, KafkaMessageListenerContainer<String, String> consumer, ProducerTopicSelector selector) {
        this.config = topicConfig;
        this.enableConsumer = topicConfig.isEnableConsumer();
        this.enableProducer = topicConfig.isEnableProducer();

        this.topicName = topicConfig.getTopicName();

        this.partitionKeyExpression = topicConfig.getProducer().getPartitionKeyExpression();

        this.kafkaTemplate = kafkaTemplate;
        this.serviceMatcher = serviceMatcher;

        this.consumer = consumer;

        this.selector = selector;
        this.processor = processor;
        this.mapper = processor.getObjectMapper();
    }

    public void sendEvent(AbstractMQEvent event) throws JsonProcessingException, ExecutionException, InterruptedException {
        if (this.serviceMatcher.isFromSelf(event) && this.enableProducer) {

            String key = (String) this.partitionKeyExpression.getValue(event, String.class);

            String topic = null;
            if (this.selector != null) {
                topic = this.selector.producerTopic(event);
                Boolean checked = this.topicCheckMap.get(topic);
                if (checked == null) {
                    this.processor.checkPartitionedTopic(topic);
                    this.topicCheckMap.put(topic, Boolean.valueOf(true));
                }
            }
            if (topic == null) {
                topic = this.topicName;
            }


            ListenableFuture<SendResult<String, String>> sendResult = this.kafkaTemplate.send(topic, key, this.mapper.writeValueAsString(event));

            sendResult.completable().whenComplete((v, t) -> {
                if (t != null) {
                    log.warn("message: {}", event);
                    log.warn("send message error", t);
                }
            });
        }
    }
}


/* Location:              E:\code\common\cmsr-common-mq-kafka\1.0.0-SNAPSHOT\cmsr-common-mq-kafka-1.0.0-20220121.074640-26.jar!\com\cmsr\common\mq\kafka\process\KafkaMessageProcessor.class
 * Java compiler version: 8 (52.0)
 * JD-Core Version:       1.1.3
 */